package com.xzc.apitest.state

import java.util

import com.xzc.apitest.source.SensorReading
import com.xzc.apitest.transform.MyReduceFunction
import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor, MapState, MapStateDescriptor, ReducingState, ReducingStateDescriptor, ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector

object StateTest {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val inputStream = env.socketTextStream("hadoop102", 7777);

    val dataStream = inputStream
      .map(data => {
        val arr = data.split(",")
        SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
      })

    //requirement: 对温度进行监控，如果跳变(前后2次)超出10度，报警
    val alertStream = dataStream
      .keyBy(_.id)
      //键控状态
      //      .flatMap(new TempChangeAlert(10.0))
      .flatMapWithState[(String, Double, Double), Double] {
        case (data: SensorReading, None) => (List.empty, Some(data.temperature))
        case (data: SensorReading, lastTemp: Some[Double]) => {
          val diff = (data.temperature - lastTemp.get).abs
          if (diff > 10.0) {
            (List((data.id, lastTemp.get, data.temperature)), Some(data.temperature))
          } else {
            (List.empty, Some(data.temperature))
          }
        }
      }

    alertStream.print()

    //执行
    env.execute("state test")
  }
}

//flatmap 1对多
class TempChangeAlert(threshold: Double) extends RichFlatMapFunction[SensorReading, (String, Double, Double)] {

  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(
    new ValueStateDescriptor[Double]("last-temp", classOf[Double])
  )

  override def flatMap(in: SensorReading, out: Collector[(String, Double, Double)]): Unit = {
    val lastTemp = lastTempState.value()
    val diff = (in.temperature - lastTemp).abs
    if (diff > threshold)
      out.collect((in.id, lastTemp, in.temperature))
    lastTempState.update(in.temperature)
  }
}

//keyed state
class MyRichMapper extends RichMapFunction[SensorReading, String] {
  var valueState: ValueState[Double] = _
  lazy val listState: ListState[Int] = getRuntimeContext.getListState(
    new ListStateDescriptor[Int]("liststate", classOf[Int])
  )
  lazy val mapState: MapState[String, Double] = getRuntimeContext.getMapState(
    new MapStateDescriptor[String, Double]("mapstate", classOf[String], classOf[Double])
  )
  lazy val reduceState: ReducingState[SensorReading] = getRuntimeContext.getReducingState(
    new ReducingStateDescriptor[SensorReading]("reducestate", new MyReduceFunction, classOf[SensorReading])
  )

  override def map(in: SensorReading): String = {
    val myValue = valueState.value()
    valueState.update(in.temperature)
    listState.add(1)
    val list = new util.ArrayList[Int]()
    list.add(2)
    list.add(3)
    listState.addAll(list)
    listState.update(list)

    mapState.contains("_1")
    mapState.get("sensor_1")
    mapState.put("sensor_1", 1.3)

    reduceState.get()
    reduceState.add(in)

    in.id
  }

  override def open(parameters: Configuration): Unit = {
    valueState = getRuntimeContext.getState(
      new ValueStateDescriptor[Double]("valuestate", classOf[Double])
    )
  }
}
